feat(modules): async modules#1920
Conversation
Greptile SummaryThis PR introduces a first-class async programming model for modules: an
Confidence Score: 2/5Not safe to merge: _logging_task_factory is broken on Python ≥ 3.10, making the entire async task system non-functional. A single P0 finding caps the score at 2. The asyncio.Task(loop=loop) call in _logging_task_factory raises TypeError on any Python ≥ 3.10, which is the actively supported range. This factory is set on every module's event loop via get_loop(), so every coroutine scheduled by spawn(), process_observable(), or _start_main() would fail to create a task. dimos/core/module.py — specifically _logging_task_factory at line 718 Important Files Changed
Sequence DiagramsequenceDiagram
participant C as Sync Caller
participant W as arpc Wrapper
participant EL as Module Event Loop
participant P as AsyncSpecProxy
participant TP as ThreadPoolExecutor
participant RL as Remote Module Loop
C->>W: handle_x(msg) from RPC dispatcher
W->>W: get_running_loop raises, running=None
W->>EL: run_coroutine_threadsafe(coro)
W->>W: future.result() blocks caller
EL->>EL: await coro
EL-->>C: result returned
EL->>P: await self._ref.remote_method(x)
P->>TP: run_in_executor(None, RpcCall)
TP->>RL: run_coroutine_threadsafe(coro)
RL-->>TP: result
TP-->>EL: Future resolved
C->>EL: start - run_coroutine_threadsafe gen anext
EL->>EL: setup code then yield
EL-->>C: setup done, start returns
C->>EL: stop - run_coroutine_threadsafe gen anext
EL->>EL: teardown then StopAsyncIteration
EL-->>C: teardown done, stop returns
Reviews (4): Last reviewed commit: "feat(modules): async modules" | Re-trigger Greptile |
02b6e09 to
079c62a
Compare
|
Want your agent to iterate on Greptile's feedback? Try greploops. |
c87457c to
c9a4d56
Compare
c9a4d56 to
4caeafa
Compare
Problem
Having so many threads in modules makes code hard to write.
Closes DIM-812
Solution
self._loopdaemon-thread asyncio loop, removing the need for locks on per-instance state.async def handle_xfor everyx: In[T]; each handler gets a dedicated dispatcher task with LATEST-only semantics (drops intermediate messages, guarantees the most recent value is eventually processed).@arpcdecorator: async RPC body interchangeable with@rpc(same discovery + RPC machinery).self.spawn(coro)replaces barerun_coroutine_threadsafeand routes unhandled exceptions to the module logger instead of swallowing them in an unread Future.self.process_observable(obs, async_handler)bridges rxpy observables ontoself._loop.async def main(self)async-generator-with-one-yieldcollapses pairedstart()/stop()resource setup/teardown into a single visually-adjacent block.Breaking Changes
None.
How to Test
Contributor License Agreement